package com.example.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by liulanhua on 2018/8/31.
 */
@Slf4j
@Configuration
@EnableKafka
@EnableBinding(value = {MsgBinding.class})
public class KafkaProducerConfig {

    @Autowired
    private Globals globals;


    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, globals.getServers());
        props.put(ProducerConfig.RETRIES_CONFIG, globals.getRetries());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, globals.getBatchSize());
        //props.put(ProducerConfig.LINGER_MS_CONFIG, globals.getLinger());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, globals.getBufferMemory());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, globals.getKeySerializer());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, globals.getValSerializer());
        return props;
    }

    public <String, T>ProducerFactory<String, T> producerFactory() {
        return new DefaultKafkaProducerFactory<String, T>(producerConfigs());
    }

    @Bean
    public <String, T>KafkaTemplate<String, T> kafkaTemplate() {
        return new KafkaTemplate<String, T>(producerFactory());
    }

    /*@Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<String, Object>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.180.128:9092");
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("foo", 10, (short) 2);
    }*/

}
